fix(bigquery-driver): forward stream errors and propagate consumer cancellation via pipeline#10877
Open
tlangton3 wants to merge 1 commit into
Open
Conversation
…ncellation via pipeline
`BigQueryDriver.stream` was wiring its returned `rowStream` to the
underlying `@google-cloud/bigquery` source stream via `stream.pipe()`.
By design, Node's `pipe()` forwards `data` and `end` events but NOT
`error` events. So when BigQuery returned an HTTP error mid-stream
(e.g. type-coercion rejections like `No matching signature for
operator = for argument types: TIMESTAMP, DATE`), the source stream's
`'error'` event had no listener — `processTicksAndRejections` fired on
Node's tick queue and killed the cubejs-server process. Every BI
session connected to that pod was torn down, with `server closed the
connection unexpectedly` over the wire and the actual BigQuery error
visible only in the container's stderr.
The non-streaming path (`driver.query`) is unaffected because the
rejection propagates through `await` and is caught by the cube native
bridge's top-level `try/catch`. The streaming path returns the
`rowStream` synchronously, so `await` resolves before the HTTP call
fires — the bridge has no chance to catch the rejection.
Switching to `stream.pipeline` fixes both observed defects:
(a) source-stream errors are auto-forwarded by destroying `rowStream`
with the same error, so the bridge's `rowStream.on('error', ...)`
handler fires and the wire layer emits a structured Postgres
`ErrorResponse` (SQLSTATE XX000) carrying the verbatim BigQuery
message;
(b) consumer-side destruction of `rowStream` (client cancellation,
severed BI session) now destroys the source too, preventing the
driver from paging results into the void after the consumer has
gone away.
Verified end-to-end against real BigQuery via the SQL API + psql:
both the success path (100,000 rows streamed cleanly) and the failure
path (BigQuery TIMESTAMP=DATE rejection surfaced as XX000 with the
verbatim message) now behave correctly; the cube container stays up.
Adds two synthetic-source unit tests (`BigQueryDriverStreamError.test`)
verifying both directions of the lifecycle propagation. Without the
fix, both tests time out (proves they catch the regression).
Fixes cube-js#10875.
b80dbe0 to
158565e
Compare
Contributor
Author
|
Force-pushed to drop the |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
BigQueryDriver.streamwires its returnedrowStreamto the underlying@google-cloud/bigquerysource stream viastream.pipe(rowStream). By design, Node'spipe()forwardsdata/endevents but noterrorevents. When BigQuery returns an HTTP error mid-stream (e.g. a type-coercion rejection likeNo matching signature for operator = for argument types: TIMESTAMP, DATE), the source stream's'error'event has no listener — Node's default'error'handler fires (processTicksAndRejections) and the cubejs-server process exits with code 1. Every BI session connected to that pod is torn down withserver closed the connection unexpectedly.The non-streaming path (
driver.query) is unaffected because the rejection propagates throughawaitand is caught by the cube native bridge's top-leveltry/catch. The streaming path returns therowStreamsynchronously, soawaitresolves before the HTTP call fires — the bridge has no chance to catch the rejection.Fixes #10875.
Changes
Switches
BigQueryDriver.streamfromstream.pipe(rowStream)tostream.pipeline(stream, rowStream, () => {})fromnode:stream. This fixes both observed defects:pipelineauto-destroysrowStreamwith the underlying error → bridge'srowStream.on('error', ...)handler fires → wire layer emits structured PostgresErrorResponse(XX000) carrying the verbatim BigQuery message.rowStream(client cancellation, severed BI session) now destroys the source too, preventing the driver from paging results into the void after the consumer has gone away.Implementation Details
pipeline()is in Node's stable API since v10 (callback form). Cube's Node engine constraint is well clear of this.pipelinealready destroysrowStreamwith the error, and the callback exists solely to satisfy the signature and prevent an unhandled rejection insidepipelineitself.stream/promises.pipelineis the wrong choice here —awaiting it would block returningrowStreamuntil the entire query completes, buffering the whole result set in memory. Callback form keeps the existing synchronous-return contract.Testing
Unit tests (new —
packages/cubejs-bigquery-driver/test/BigQueryDriverStreamError.test.ts)Two synthetic-source tests using
PassThroughstreams (no real BigQuery needed):forwards source-stream errors to the returned rowStream— emits an error on a mock source and assertsrowStreamreceives it.propagates rowStream destruction back to the source stream— callsrowStream.destroy()and asserts the mock source'sdestroyedflag is set.Both tests time out when the fix is reverted to bare
stream.pipe()(proves they catch the regression).End-to-end verification against real BigQuery
Verified via the cube SQL API + psql (cube v1.6.46 with patched
BigQueryDriver.jsoverlaid into a running container):TIMESTAMP=DATEerrorserver closed the connection unexpectedlyERROR: XX000: Database Execution Error: No matching signature for operator = ... Signature: T1 = T1 ...; container aliveWire-level behaviour is now identical to the non-streaming SQL API (same
XX000SQLSTATE, samepg.DatabaseErrorshape, same verbatim BigQuery message reaching the client).Compatibility & risk
stream.pipelineis the textbook Node primitive for this exact wiring. No new dependencies.